package com.atguigu.flink.chapter02_DataStreamAPI.agg;

import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2022/10/22
 *
 *      求每种传感器的最大水位
 */
public class Demo17_Max
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
           .socketTextStream("hadoop103", 8888)
           .map(new MapFunction<String, WaterSensor>()
           {
               @Override
               public WaterSensor map(String value) throws Exception {
                   String[] data = value.split(",");
                   return new WaterSensor(
                       data[0],
                       Long.valueOf(data[1]),
                       Integer.valueOf(data[2])
                   );
               }
           })
           .keyBy(WaterSensor::getId)
           /*
                select
                    id,max(vc), 常量(第一条数据的其他字段的值)
                from xxx
                group by id
            */
           .max("vc")
           .print();

        try {
                    env.execute();
                } catch (Exception e) {
                    e.printStackTrace();
                }
    }
}
